package com.jcbase.core.plugin.process;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jfinal.plugin.IPlugin;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
 * @category 处理器插件
 * @date 2015-12-14下午10:39:18
 */
public class ProcessPlugin implements IPlugin {
	private static final Logger logger = LoggerFactory.getLogger(ProcessPlugin.class);
	private final ExecutorService executorService = Executors.newCachedThreadPool();

	private JedisPool redisPool;
	private List<String> queueNames;
	private RedisProcesser processer;
	/** 监听器监听间隔 默认0 */
	private long sleepTime = 0;

	public ProcessPlugin setRedisPool(JedisPool redisPool) {
		this.redisPool = redisPool;
		return this;
	}

	public ProcessPlugin setQueueNames(List<String> queueNames) {
		this.queueNames = queueNames;
		return this;
	}

	public ProcessPlugin setProcesser(RedisProcesser processer) {
		this.processer = processer;
		return this;
	}

	public ProcessPlugin() {
	}

	public ProcessPlugin(JedisPool redisPool, List<String> queueNames, RedisProcesser processer) {
		this.redisPool = redisPool;
		this.queueNames = queueNames;
		this.processer = processer;
	}

	@Override
	public boolean start() {
		logger.info("监听器已经启动");
		for (String queueName : queueNames) {
			logger.info("队列名称为：{}", queueName);
			Worker worker = new Worker(queueName);
			executorService.execute(worker);
		}
		return true;
	}

	@Override
	public boolean stop() {
		shutdownAndAwaitTermination(executorService);
		return true;
	}

	private class Worker implements Runnable {
		private final String queueName;

		public Worker(String queueName) {
			this.queueName = queueName;
		}

		public void run() {
			Jedis jedis = null;
			List<String> datas = null;
			String name = null;
			String content = null;
			boolean isJedisException = false;

			while (!Thread.currentThread().isInterrupted()) {
				try {
					if (null == jedis) {
						jedis = redisPool.getResource();
					}

					datas = jedis.brpop(1, queueName);
					if ((datas != null) && (datas.size() == 2)) {
						try {
							name = datas.get(0);
							content = datas.get(1);
							logger.debug("从redis队列中取到的数据为: queueName={}, content={}", name, content);
							processer.doProcess(name, content);
						} catch (Exception e) {
							logger.error("从redis队列处理消息出现异常, queueName: " + datas.get(0) + ", content: " + datas.get(1)
									+ ", 异常: " + e.toString(), e);
						}
					}

					if (sleepTime != 0) {
						Thread.sleep(sleepTime);
					}
				} catch (JedisConnectionException e) {
					isJedisException = true;
					logger.error(e.toString(), e);
					if (null != jedis) {
						jedis.close();
						jedis = null;
					}
				} catch (Exception e) {
					isJedisException = true;
					logger.error(e.toString(), e);
					if (null != jedis) {
						jedis.close();
						jedis = null;
					}
				}

				// 如果Redis发生了异常，则休眠3秒
				if (isJedisException) {
					try {
						logger.error("Redis监听器异常, 监听器将休眠3秒之后再重试...");
						Thread.sleep(3000);
					} catch (Exception e) {
						logger.error(e.toString(), e);
					}

					isJedisException = false;
				}
			}
		}
	}

	private void shutdownAndAwaitTermination(ExecutorService pool) {
		pool.shutdown();
		try {
			if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
				pool.shutdownNow();
				if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
					logger.error("Pool did not terminate");
				}
			}
		} catch (InterruptedException ie) {
			pool.shutdownNow();
			Thread.currentThread().interrupt();
		}
	}
}
